package org.databandtech.flink.demo;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Map 转换操作，map操作用于1 vs 1输出，输出的数量和输入是一致的。
 * @author Administrator
 *
 */
public class ElementsMap {

	public static void main(String[] args) {
		
	    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		try {
			listSource(env);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private static void listSource(StreamExecutionEnvironment env) throws Exception {

		DataStreamSource<Integer> dataSetElements = env.fromElements(1, 2, -3, 0, 5, -9, 8);
		dataSetElements.print();
		
		SingleOutputStreamOperator<String> dataStream = dataSetElements.map(new MapTransformation());
		dataStream.print();
		env.execute("Flink Transformation");
	}
	
	public static class MapTransformation implements MapFunction<Integer,String> {

		private static final long serialVersionUID = -5913581045622029628L;

		public String map(Integer in) throws Exception {
			Integer result = in*2;
			return "计算："+in+"-结果："+result;
		}
	}
}
